home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.4)
-
- import unittest
- from test import test_support
- import socket
- import select
- import time
- import thread
- import threading
- import Queue
- import sys
- from weakref import proxy
- PORT = 50007
- HOST = 'localhost'
- MSG = 'Michael Gilfix was here\n'
-
- class SocketTCPTest(unittest.TestCase):
-
- def setUp(self):
- self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.serv.bind((HOST, PORT))
- self.serv.listen(1)
-
-
- def tearDown(self):
- self.serv.close()
- self.serv = None
-
-
-
- class SocketUDPTest(unittest.TestCase):
-
- def setUp(self):
- self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.serv.bind((HOST, PORT))
-
-
- def tearDown(self):
- self.serv.close()
- self.serv = None
-
-
-
- class ThreadableTest:
- """Threadable Test class
-
- The ThreadableTest class makes it easy to create a threaded
- client/server pair from an existing unit test. To create a
- new threaded class from an existing unit test, use multiple
- inheritance:
-
- class NewClass (OldClass, ThreadableTest):
- pass
-
- This class defines two new fixture functions with obvious
- purposes for overriding:
-
- clientSetUp ()
- clientTearDown ()
-
- Any new test functions within the class must then define
- tests in pairs, where the test name is preceeded with a
- '_' to indicate the client portion of the test. Ex:
-
- def testFoo(self):
- # Server portion
-
- def _testFoo(self):
- # Client portion
-
- Any exceptions raised by the clients during their tests
- are caught and transferred to the main thread to alert
- the testing framework.
-
- Note, the server setup function cannot call any blocking
- functions that rely on the client thread during setup,
- unless serverExplicityReady() is called just before
- the blocking call (such as in setting up a client/server
- connection and performing the accept() in setUp().
- """
-
- def __init__(self):
- self._ThreadableTest__setUp = self.setUp
- self._ThreadableTest__tearDown = self.tearDown
- self.setUp = self._setUp
- self.tearDown = self._tearDown
-
-
- def serverExplicitReady(self):
- '''This method allows the server to explicitly indicate that
- it wants the client thread to proceed. This is useful if the
- server is about to execute a blocking routine that is
- dependent upon the client thread during its setup routine.'''
- self.server_ready.set()
-
-
- def _setUp(self):
- self.server_ready = threading.Event()
- self.client_ready = threading.Event()
- self.done = threading.Event()
- self.queue = Queue.Queue(1)
- methodname = self.id()
- i = methodname.rfind('.')
- methodname = methodname[i + 1:]
- test_method = getattr(self, '_' + methodname)
- self.client_thread = thread.start_new_thread(self.clientRun, (test_method,))
- self._ThreadableTest__setUp()
- if not self.server_ready.isSet():
- self.server_ready.set()
-
- self.client_ready.wait()
-
-
- def _tearDown(self):
- self._ThreadableTest__tearDown()
- self.done.wait()
- if not self.queue.empty():
- msg = self.queue.get()
- self.fail(msg)
-
-
-
- def clientRun(self, test_func):
- self.server_ready.wait()
- self.client_ready.set()
- self.clientSetUp()
- if not callable(test_func):
- raise TypeError, 'test_func must be a callable function'
-
-
- try:
- test_func()
- except Exception:
- strerror = None
- self.queue.put(strerror)
-
- self.clientTearDown()
-
-
- def clientSetUp(self):
- raise NotImplementedError, 'clientSetUp must be implemented.'
-
-
- def clientTearDown(self):
- self.done.set()
- thread.exit()
-
-
-
- class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
-
- def __init__(self, methodName = 'runTest'):
- SocketTCPTest.__init__(self, methodName = methodName)
- ThreadableTest.__init__(self)
-
-
- def clientSetUp(self):
- self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
-
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
-
-
-
- class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
-
- def __init__(self, methodName = 'runTest'):
- SocketUDPTest.__init__(self, methodName = methodName)
- ThreadableTest.__init__(self)
-
-
- def clientSetUp(self):
- self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-
-
-
- class SocketConnectedTest(ThreadedTCPSocketTest):
-
- def __init__(self, methodName = 'runTest'):
- ThreadedTCPSocketTest.__init__(self, methodName = methodName)
-
-
- def setUp(self):
- ThreadedTCPSocketTest.setUp(self)
- self.serverExplicitReady()
- (conn, addr) = self.serv.accept()
- self.cli_conn = conn
-
-
- def tearDown(self):
- self.cli_conn.close()
- self.cli_conn = None
- ThreadedTCPSocketTest.tearDown(self)
-
-
- def clientSetUp(self):
- ThreadedTCPSocketTest.clientSetUp(self)
- self.cli.connect((HOST, PORT))
- self.serv_conn = self.cli
-
-
- def clientTearDown(self):
- self.serv_conn.close()
- self.serv_conn = None
- ThreadedTCPSocketTest.clientTearDown(self)
-
-
-
- class SocketPairTest(unittest.TestCase, ThreadableTest):
-
- def __init__(self, methodName = 'runTest'):
- unittest.TestCase.__init__(self, methodName = methodName)
- ThreadableTest.__init__(self)
-
-
- def setUp(self):
- (self.serv, self.cli) = socket.socketpair()
-
-
- def tearDown(self):
- self.serv.close()
- self.serv = None
-
-
- def clientSetUp(self):
- pass
-
-
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
-
-
-
- class GeneralModuleTests(unittest.TestCase):
-
- def test_weakref(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- p = proxy(s)
- self.assertEqual(p.fileno(), s.fileno())
- s.close()
- s = None
-
- try:
- p.fileno()
- except ReferenceError:
- pass
-
- self.fail('Socket proxy still exists')
-
-
- def testSocketError(self):
-
- def raise_error(*args, **kwargs):
- raise socket.error
-
-
- def raise_herror(*args, **kwargs):
- raise socket.herror
-
-
- def raise_gaierror(*args, **kwargs):
- raise socket.gaierror
-
- self.failUnlessRaises(socket.error, raise_error, 'Error raising socket exception.')
- self.failUnlessRaises(socket.error, raise_herror, 'Error raising socket exception.')
- self.failUnlessRaises(socket.error, raise_gaierror, 'Error raising socket exception.')
-
-
- def testCrucialConstants(self):
- socket.AF_INET
- socket.SOCK_STREAM
- socket.SOCK_DGRAM
- socket.SOCK_RAW
- socket.SOCK_RDM
- socket.SOCK_SEQPACKET
- socket.SOL_SOCKET
- socket.SO_REUSEADDR
-
-
- def testHostnameRes(self):
- hostname = socket.gethostname()
-
- try:
- ip = socket.gethostbyname(hostname)
- except socket.error:
- return None
-
- self.assert_(ip.find('.') >= 0, 'Error resolving host to ip.')
-
- try:
- (hname, aliases, ipaddrs) = socket.gethostbyaddr(ip)
- except socket.error:
- return None
-
- all_host_names = [
- hostname,
- hname] + aliases
- fqhn = socket.getfqdn()
- if fqhn not in all_host_names:
- self.fail('Error testing host resolution mechanisms.')
-
-
-
- def testRefCountGetNameInfo(self):
- import sys
- if hasattr(sys, 'getrefcount'):
-
- try:
- orig = sys.getrefcount(__name__)
- socket.getnameinfo(__name__, 0)
- except SystemError:
- if sys.getrefcount(__name__) != orig:
- self.fail('socket.getnameinfo loses a reference')
-
- except:
- sys.getrefcount(__name__) != orig
-
-
- None<EXCEPTION MATCH>SystemError
-
-
- def testInterpreterCrash(self):
-
- try:
- socket.getnameinfo(('x', 0, 0, 0), 0)
- except socket.error:
- pass
-
-
-
- def testNtoH(self):
- sizes = {
- socket.htonl: 32,
- socket.ntohl: 32,
- socket.htons: 16,
- socket.ntohs: 16 }
- for func, size in sizes.items():
- mask = (0x1L << size) - 1
- for i in (0, 1, 65535, -65536, 2, 19088743, 1985229328):
- self.assertEqual(i & mask, func(func(i & mask)) & mask)
-
- swapped = func(mask)
- self.assertEqual(swapped & mask, mask)
- self.assertRaises(OverflowError, func, 0x1L << 34)
-
-
-
- def testGetServBy(self):
- eq = self.assertEqual
- if sys.platform in ('linux2', 'freebsd4', 'freebsd5', 'freebsd6', 'darwin'):
- services = ('daytime', 'qotd', 'domain')
- else:
- services = ('echo', 'daytime', 'domain')
- for service in services:
-
- try:
- port = socket.getservbyname(service, 'tcp')
- continue
- except socket.error:
- continue
-
-
- else:
- raise socket.error
- port2 = socket.getservbyname(service)
- eq(port, port2)
-
- try:
- udpport = socket.getservbyname(service, 'udp')
- except socket.error:
- None<EXCEPTION MATCH>socket.error
- None<EXCEPTION MATCH>socket.error
- udpport = None
- except:
- None<EXCEPTION MATCH>socket.error
-
- eq(udpport, port)
- eq(socket.getservbyport(port2), service)
- eq(socket.getservbyport(port, 'tcp'), service)
- if udpport is not None:
- eq(socket.getservbyport(udpport, 'udp'), service)
-
-
-
- def testDefaultTimeout(self):
- self.assertEqual(socket.getdefaulttimeout(), None)
- s = socket.socket()
- self.assertEqual(s.gettimeout(), None)
- s.close()
- socket.setdefaulttimeout(10)
- self.assertEqual(socket.getdefaulttimeout(), 10)
- s = socket.socket()
- self.assertEqual(s.gettimeout(), 10)
- s.close()
- socket.setdefaulttimeout(None)
- self.assertEqual(socket.getdefaulttimeout(), None)
- s = socket.socket()
- self.assertEqual(s.gettimeout(), None)
- s.close()
- self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
- self.assertRaises(TypeError, socket.setdefaulttimeout, 'spam')
-
-
- def testIPv4toString(self):
- if not hasattr(socket, 'inet_pton'):
- return None
-
- f = inet_aton
- inet_pton = inet_pton
- AF_INET = AF_INET
- import socket
-
- g = lambda a: inet_pton(AF_INET, a)
- self.assertEquals('\x00\x00\x00\x00', f('0.0.0.0'))
- self.assertEquals('\xff\x00\xff\x00', f('255.0.255.0'))
- self.assertEquals('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
- self.assertEquals('\x01\x02\x03\x04', f('1.2.3.4'))
- self.assertEquals('\xff\xff\xff\xff', f('255.255.255.255'))
- self.assertEquals('\x00\x00\x00\x00', g('0.0.0.0'))
- self.assertEquals('\xff\x00\xff\x00', g('255.0.255.0'))
- self.assertEquals('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
- self.assertEquals('\xff\xff\xff\xff', g('255.255.255.255'))
-
-
- def testIPv6toString(self):
- if not hasattr(socket, 'inet_pton'):
- return None
-
-
- try:
- inet_pton = inet_pton
- AF_INET6 = AF_INET6
- has_ipv6 = has_ipv6
- import socket
- if not has_ipv6:
- return None
- except ImportError:
- return None
-
-
- f = lambda a: inet_pton(AF_INET6, a)
- self.assertEquals('\x00' * 16, f('::'))
- self.assertEquals('\x00' * 16, f('0::0'))
- self.assertEquals('\x00\x01' + '\x00' * 14, f('1::'))
- self.assertEquals('E\xefv\xcb\x00\x1aV\xef\xaf\xeb\x0b\xac\x19$\xae\xae', f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae'))
-
-
- def testStringToIPv4(self):
- if not hasattr(socket, 'inet_ntop'):
- return None
-
- f = inet_ntoa
- inet_ntop = inet_ntop
- AF_INET = AF_INET
- import socket
-
- g = lambda a: inet_ntop(AF_INET, a)
- self.assertEquals('1.0.1.0', f('\x01\x00\x01\x00'))
- self.assertEquals('170.85.170.85', f('\xaaU\xaaU'))
- self.assertEquals('255.255.255.255', f('\xff\xff\xff\xff'))
- self.assertEquals('1.2.3.4', f('\x01\x02\x03\x04'))
- self.assertEquals('1.0.1.0', g('\x01\x00\x01\x00'))
- self.assertEquals('170.85.170.85', g('\xaaU\xaaU'))
- self.assertEquals('255.255.255.255', g('\xff\xff\xff\xff'))
-
-
- def testStringToIPv6(self):
- if not hasattr(socket, 'inet_ntop'):
- return None
-
-
- try:
- inet_ntop = inet_ntop
- AF_INET6 = AF_INET6
- has_ipv6 = has_ipv6
- import socket
- if not has_ipv6:
- return None
- except ImportError:
- return None
-
-
- f = lambda a: inet_ntop(AF_INET6, a)
- self.assertEquals('::', f('\x00' * 16))
- self.assertEquals('::1', f('\x00' * 15 + '\x01'))
- self.assertEquals('aef:b01:506:1001:ffff:9997:55:170', f('\n\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00U\x01p'))
-
-
- def testSockName(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('0.0.0.0', PORT + 1))
- name = sock.getsockname()
- self.assertEqual(name, ('0.0.0.0', PORT + 1))
-
-
- def testGetSockOpt(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
- self.failIf(reuse != 0, 'initial mode is reuse')
-
-
- def testSetSockOpt(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
- self.failIf(reuse == 0, 'failed to set reuse mode')
-
-
- def testSendAfterClose(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(1)
- sock.close()
- self.assertRaises(socket.error, sock.send, 'spam')
-
-
-
- class BasicTCPTest(SocketConnectedTest):
-
- def __init__(self, methodName = 'runTest'):
- SocketConnectedTest.__init__(self, methodName = methodName)
-
-
- def testRecv(self):
- msg = self.cli_conn.recv(1024)
- self.assertEqual(msg, MSG)
-
-
- def _testRecv(self):
- self.serv_conn.send(MSG)
-
-
- def testOverFlowRecv(self):
- seg1 = self.cli_conn.recv(len(MSG) - 3)
- seg2 = self.cli_conn.recv(1024)
- msg = seg1 + seg2
- self.assertEqual(msg, MSG)
-
-
- def _testOverFlowRecv(self):
- self.serv_conn.send(MSG)
-
-
- def testRecvFrom(self):
- (msg, addr) = self.cli_conn.recvfrom(1024)
- self.assertEqual(msg, MSG)
-
-
- def _testRecvFrom(self):
- self.serv_conn.send(MSG)
-
-
- def testOverFlowRecvFrom(self):
- (seg1, addr) = self.cli_conn.recvfrom(len(MSG) - 3)
- (seg2, addr) = self.cli_conn.recvfrom(1024)
- msg = seg1 + seg2
- self.assertEqual(msg, MSG)
-
-
- def _testOverFlowRecvFrom(self):
- self.serv_conn.send(MSG)
-
-
- def testSendAll(self):
- msg = ''
- while None:
- read = self.cli_conn.recv(1024)
- if not read:
- break
-
- msg += read
- self.assertEqual(msg, 'f' * 2048)
-
-
- def _testSendAll(self):
- big_chunk = 'f' * 2048
- self.serv_conn.sendall(big_chunk)
-
-
- def testFromFd(self):
- if not hasattr(socket, 'fromfd'):
- return None
-
- fd = self.cli_conn.fileno()
- sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
- msg = sock.recv(1024)
- self.assertEqual(msg, MSG)
-
-
- def _testFromFd(self):
- self.serv_conn.send(MSG)
-
-
- def testShutdown(self):
- msg = self.cli_conn.recv(1024)
- self.assertEqual(msg, MSG)
-
-
- def _testShutdown(self):
- self.serv_conn.send(MSG)
- self.serv_conn.shutdown(2)
-
-
-
- class BasicUDPTest(ThreadedUDPSocketTest):
-
- def __init__(self, methodName = 'runTest'):
- ThreadedUDPSocketTest.__init__(self, methodName = methodName)
-
-
- def testSendtoAndRecv(self):
- msg = self.serv.recv(len(MSG))
- self.assertEqual(msg, MSG)
-
-
- def _testSendtoAndRecv(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
-
-
- def testRecvFrom(self):
- (msg, addr) = self.serv.recvfrom(len(MSG))
- self.assertEqual(msg, MSG)
-
-
- def _testRecvFrom(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
-
-
-
- class BasicSocketPairTest(SocketPairTest):
-
- def __init__(self, methodName = 'runTest'):
- SocketPairTest.__init__(self, methodName = methodName)
-
-
- def testRecv(self):
- msg = self.serv.recv(1024)
- self.assertEqual(msg, MSG)
-
-
- def _testRecv(self):
- self.cli.send(MSG)
-
-
- def testSend(self):
- self.serv.send(MSG)
-
-
- def _testSend(self):
- msg = self.cli.recv(1024)
- self.assertEqual(msg, MSG)
-
-
-
- class NonBlockingTCPTests(ThreadedTCPSocketTest):
-
- def __init__(self, methodName = 'runTest'):
- ThreadedTCPSocketTest.__init__(self, methodName = methodName)
-
-
- def testSetBlocking(self):
- self.serv.setblocking(0)
- start = time.time()
-
- try:
- self.serv.accept()
- except socket.error:
- pass
-
- end = time.time()
- self.assert_(end - start < 1.0, 'Error setting non-blocking mode.')
-
-
- def _testSetBlocking(self):
- pass
-
-
- def testAccept(self):
- self.serv.setblocking(0)
-
- try:
- (conn, addr) = self.serv.accept()
- except socket.error:
- pass
-
- self.fail('Error trying to do non-blocking accept.')
- (read, write, err) = select.select([
- self.serv], [], [])
- if self.serv in read:
- (conn, addr) = self.serv.accept()
- else:
- self.fail('Error trying to do accept after select.')
-
-
- def _testAccept(self):
- time.sleep(0.10000000000000001)
- self.cli.connect((HOST, PORT))
-
-
- def testConnect(self):
- (conn, addr) = self.serv.accept()
-
-
- def _testConnect(self):
- self.cli.settimeout(10)
- self.cli.connect((HOST, PORT))
-
-
- def testRecv(self):
- (conn, addr) = self.serv.accept()
- conn.setblocking(0)
-
- try:
- msg = conn.recv(len(MSG))
- except socket.error:
- pass
-
- self.fail('Error trying to do non-blocking recv.')
- (read, write, err) = select.select([
- conn], [], [])
- if conn in read:
- msg = conn.recv(len(MSG))
- self.assertEqual(msg, MSG)
- else:
- self.fail('Error during select call to non-blocking socket.')
-
-
- def _testRecv(self):
- self.cli.connect((HOST, PORT))
- time.sleep(0.10000000000000001)
- self.cli.send(MSG)
-
-
-
- class FileObjectClassTestCase(SocketConnectedTest):
- bufsize = -1
-
- def __init__(self, methodName = 'runTest'):
- SocketConnectedTest.__init__(self, methodName = methodName)
-
-
- def setUp(self):
- SocketConnectedTest.setUp(self)
- self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
-
-
- def tearDown(self):
- self.serv_file.close()
- self.assert_(self.serv_file.closed)
- self.serv_file = None
- SocketConnectedTest.tearDown(self)
-
-
- def clientSetUp(self):
- SocketConnectedTest.clientSetUp(self)
- self.cli_file = self.serv_conn.makefile('wb')
-
-
- def clientTearDown(self):
- self.cli_file.close()
- self.assert_(self.cli_file.closed)
- self.cli_file = None
- SocketConnectedTest.clientTearDown(self)
-
-
- def testSmallRead(self):
- first_seg = self.serv_file.read(len(MSG) - 3)
- second_seg = self.serv_file.read(3)
- msg = first_seg + second_seg
- self.assertEqual(msg, MSG)
-
-
- def _testSmallRead(self):
- self.cli_file.write(MSG)
- self.cli_file.flush()
-
-
- def testFullRead(self):
- msg = self.serv_file.read()
- self.assertEqual(msg, MSG)
-
-
- def _testFullRead(self):
- self.cli_file.write(MSG)
- self.cli_file.close()
-
-
- def testUnbufferedRead(self):
- buf = ''
- while None:
- char = self.serv_file.read(1)
- if not char:
- break
-
- buf += char
- self.assertEqual(buf, MSG)
-
-
- def _testUnbufferedRead(self):
- self.cli_file.write(MSG)
- self.cli_file.flush()
-
-
- def testReadline(self):
- line = self.serv_file.readline()
- self.assertEqual(line, MSG)
-
-
- def _testReadline(self):
- self.cli_file.write(MSG)
- self.cli_file.flush()
-
-
- def testClosedAttr(self):
- self.assert_(not (self.serv_file.closed))
-
-
- def _testClosedAttr(self):
- self.assert_(not (self.cli_file.closed))
-
-
-
- class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
- """Repeat the tests from FileObjectClassTestCase with bufsize==0.
-
- In this case (and in this case only), it should be possible to
- create a file object, read a line from it, create another file
- object, read another line from it, without loss of data in the
- first file object's buffer. Note that httplib relies on this
- when reading multiple requests from the same socket."""
- bufsize = 0
-
- def testUnbufferedReadline(self):
- line = self.serv_file.readline()
- self.assertEqual(line, 'A. ' + MSG)
- self.serv_file = self.cli_conn.makefile('rb', 0)
- line = self.serv_file.readline()
- self.assertEqual(line, 'B. ' + MSG)
-
-
- def _testUnbufferedReadline(self):
- self.cli_file.write('A. ' + MSG)
- self.cli_file.write('B. ' + MSG)
- self.cli_file.flush()
-
-
-
- class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
- bufsize = 1
-
-
- class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
- bufsize = 2
-
-
- class TCPTimeoutTest(SocketTCPTest):
-
- def testTCPTimeout(self):
-
- def raise_timeout(*args, **kwargs):
- self.serv.settimeout(1.0)
- self.serv.accept()
-
- self.failUnlessRaises(socket.timeout, raise_timeout, 'Error generating a timeout exception (TCP)')
-
-
- def testTimeoutZero(self):
- ok = False
-
- try:
- self.serv.settimeout(0.0)
- foo = self.serv.accept()
- except socket.timeout:
- self.fail('caught timeout instead of error (TCP)')
- except socket.error:
- ok = True
- except:
- self.fail('caught unexpected exception (TCP)')
-
- if not ok:
- self.fail('accept() returned success when we did not expect it')
-
-
-
-
- class UDPTimeoutTest(SocketTCPTest):
-
- def testUDPTimeout(self):
-
- def raise_timeout(*args, **kwargs):
- self.serv.settimeout(1.0)
- self.serv.recv(1024)
-
- self.failUnlessRaises(socket.timeout, raise_timeout, 'Error generating a timeout exception (UDP)')
-
-
- def testTimeoutZero(self):
- ok = False
-
- try:
- self.serv.settimeout(0.0)
- foo = self.serv.recv(1024)
- except socket.timeout:
- self.fail('caught timeout instead of error (UDP)')
- except socket.error:
- ok = True
- except:
- self.fail('caught unexpected exception (UDP)')
-
- if not ok:
- self.fail('recv() returned success when we did not expect it')
-
-
-
-
- class TestExceptions(unittest.TestCase):
-
- def testExceptionTree(self):
- self.assert_(issubclass(socket.error, Exception))
- self.assert_(issubclass(socket.herror, socket.error))
- self.assert_(issubclass(socket.gaierror, socket.error))
- self.assert_(issubclass(socket.timeout, socket.error))
-
-
-
- def test_main():
- tests = [
- GeneralModuleTests,
- BasicTCPTest,
- TCPTimeoutTest,
- TestExceptions]
- if sys.platform != 'mac':
- tests.extend([
- BasicUDPTest,
- UDPTimeoutTest])
-
- tests.extend([
- NonBlockingTCPTests,
- FileObjectClassTestCase,
- UnbufferedFileObjectClassTestCase,
- LineBufferedFileObjectClassTestCase,
- SmallBufferedFileObjectClassTestCase])
- if hasattr(socket, 'socketpair'):
- tests.append(BasicSocketPairTest)
-
- test_support.run_unittest(*tests)
-
- if __name__ == '__main__':
- test_main()
-
-